# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
#pylint: disable=too-many-lines

import asyncio
from collections.abc import AsyncIterator
import datetime
import functools
import logging
import time
import warnings
from enum import Enum
from typing import (
    Any,
    List,
    Optional,
    AsyncIterator as AsyncIteratorType,
    Union,
    TYPE_CHECKING,
    cast
)

from ..exceptions import MessageLockLostError
from ._servicebus_session_async import ServiceBusSession
from ._base_handler_async import BaseHandler
from .._common.message import ServiceBusReceivedMessage
from .._common.receiver_mixins import ReceiverMixin
from .._common.constants import (
    CONSUMER_IDENTIFIER,
    REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
    REQUEST_RESPONSE_PEEK_OPERATION,
    REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
    REQUEST_RESPONSE_RENEWLOCK_OPERATION,
    ServiceBusReceiveMode,
    MGMT_REQUEST_DISPOSITION_STATUS,
    MGMT_REQUEST_LOCK_TOKENS,
    MGMT_REQUEST_SEQUENCE_NUMBERS,
    MGMT_REQUEST_RECEIVER_SETTLE_MODE,
    MGMT_REQUEST_FROM_SEQUENCE_NUMBER,
    MGMT_REQUEST_MAX_MESSAGE_COUNT,
    MESSAGE_COMPLETE,
    MESSAGE_DEAD_LETTER,
    MESSAGE_ABANDON,
    MESSAGE_DEFER,
    MESSAGE_RENEW_LOCK,
    MESSAGE_MGMT_SETTLEMENT_TERM_MAP,
    MGMT_REQUEST_DEAD_LETTER_REASON,
    MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION,
    MGMT_RESPONSE_MESSAGE_EXPIRATION,
)
from .._common import mgmt_handlers
from .._common.utils import utc_from_timestamp
from .._common.tracing import (
    receive_trace_context_manager,
    settle_trace_context_manager,
    get_receive_links,
    get_span_link_from_message,
    SPAN_NAME_RECEIVE_DEFERRED,
    SPAN_NAME_PEEK,
)
from ._async_utils import create_authentication

if TYPE_CHECKING:
    try:
        # pylint:disable=unused-import
        from uamqp.async_ops.client_async import ReceiveClientAsync as uamqp_ReceiveClientAsync
        from uamqp.authentication import JWTTokenAsync as uamqp_JWTTokenAuthAsync
        from uamqp.message import Message as uamqp_Message
    except ImportError:
        pass
    from ._transport._base_async import AmqpTransportAsync
    from .._pyamqp.message import Message as pyamqp_Message
    from .._pyamqp.aio import ReceiveClientAsync as pyamqp_ReceiveClientAsync
    from .._pyamqp.aio._authentication_async import JWTTokenAuthAsync as pyamqp_JWTTokenAuthAsync
    from azure.core.credentials_async import AsyncTokenCredential
    from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
    from .._common.auto_lock_renewer import AutoLockRenewer

_LOGGER = logging.getLogger(__name__)


class ServiceBusReceiver(AsyncIterator, BaseHandler, ReceiverMixin):
    """The ServiceBusReceiver class defines a high level interface for
    receiving messages from the Azure Service Bus Queue or Topic Subscription.

    The two primary channels for message receipt are `receive()` to make a single request for messages,
    and `async for message in receiver:` to continuously receive incoming messages in an ongoing fashion.

    Please use the `get_<queue/subscription>_receiver` method of ~azure.servicebus.aio.ServiceBusClient to create a
    ServiceBusReceiver instance.

    :ivar fully_qualified_namespace: The fully qualified host name for the Service Bus namespace.
     The namespace format is: `<yournamespace>.servicebus.windows.net`.
    :vartype fully_qualified_namespace: str
    :ivar entity_path: The path of the entity that the client connects to.
    :vartype entity_path: str

    :param str fully_qualified_namespace: The fully qualified host name for the Service Bus namespace.
     The namespace format is: `<yournamespace>.servicebus.windows.net`.
    :param credential: The credential object used for authentication which
     implements a particular interface for getting tokens. It accepts
     credential objects generated by the azure-identity library and objects that implement the
     `get_token(self, *scopes)` method, or alternatively, an AzureSasCredential can be provided too.
    :type credential: ~azure.core.credentials_async.AsyncTokenCredential or ~azure.core.credentials.AzureSasCredential
     or ~azure.core.credentials.AzureNamedKeyCredential
    :keyword str queue_name: The path of specific Service Bus Queue the client connects to.
    :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription
     the client connects to.
    :keyword str subscription_name: The path of specific Service Bus Subscription under the
     specified Topic the client connects to.
    :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
     are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
     lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
     will be immediately removed from the queue, and cannot be subsequently abandoned or re-received
     if the client fails to process the message.
     The default mode is PEEK_LOCK.
    :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
    :keyword Optional[float] max_wait_time:  The timeout in seconds to wait for the first and subsequent
     messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
     until the connection is closed. The default value is None, meaning no timeout. On a sessionful
     queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
     to a session. If connection errors are occurring due to write timing out,the connection timeout
     value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
    :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
    :keyword transport_type: The type of transport protocol that will be used for communicating with
     the Service Bus service. Default is `TransportType.Amqp`.
    :paramtype transport_type: ~azure.servicebus.TransportType
    :keyword Dict http_proxy: HTTP proxy settings. This must be a dictionary with the following
     keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
     Additionally the following keys may also be present: `'username', 'password'`.
    :keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
    :keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.aio.AutoLockRenewer
     can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver,
     it will apply to the session instead.
    :keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
     This setting is only for advanced performance tuning. Increasing this value will improve message throughput
     performance but increase the chance that messages will expire while they are cached if they're not
     processed fast enough.
     The default value is 0, meaning messages will be received from the service and processed one at a time.
     In the case of prefetch_count being 0, `ServiceBusReceiver.receive_messages` would try to cache
     `max_message_count` (if provided) within its request to the service.
     WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in
     the in-memory prefetch buffer until they're received into the application. If the application ends before
     the messages are received into the application, those messages will be lost and unable to be recovered.
     Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
    :keyword str client_identifier: A string-based identifier to uniquely identify the client instance.
     Service Bus will associate it with some error messages for easier correlation of errors. If not specified,
     a unique id will be generated.
    :keyword float socket_timeout: The time in seconds that the underlying socket on the connection should
     wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp
     and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out,
     a larger than default value may need to be passed in.
    """

    def __init__(
        self,
        fully_qualified_namespace: str,
        credential: Union["AsyncTokenCredential", "AzureSasCredential", "AzureNamedKeyCredential"],
        *,
        queue_name: Optional[str] = None,
        topic_name: Optional[str] = None,
        subscription_name: Optional[str] = None,
        receive_mode: Union[
            ServiceBusReceiveMode, str
        ] = ServiceBusReceiveMode.PEEK_LOCK,
        max_wait_time: Optional[float] = None,
        auto_lock_renewer: Optional["AutoLockRenewer"] = None,
        prefetch_count: int = 0,
        **kwargs: Any
    ) -> None:
        self._session_id = None
        self._message_iter: Optional[AsyncIteratorType[Union["uamqp_Message", "pyamqp_Message"]]] = (
            None
        )
        self._amqp_transport: "AmqpTransportAsync"
        if kwargs.get("entity_name"):
            super(ServiceBusReceiver, self).__init__(
                fully_qualified_namespace=fully_qualified_namespace,
                credential=credential,
                queue_name=queue_name,
                topic_name=topic_name,
                subscription_name=subscription_name,
                receive_mode=receive_mode,
                max_wait_time=max_wait_time,
                auto_lock_renewer=auto_lock_renewer,
                prefetch_count=prefetch_count,
                **kwargs
            )
        else:
            if queue_name and topic_name:
                raise ValueError(
                    "Queue/Topic name can not be specified simultaneously."
                )
            if not (queue_name or topic_name):
                raise ValueError(
                    "Queue/Topic name is missing. Please specify queue_name/topic_name."
                )
            if topic_name and not subscription_name:
                raise ValueError(
                    "Subscription name is missing for the topic. Please specify subscription_name."
                )

            entity_name = queue_name or topic_name

            super(ServiceBusReceiver, self).__init__(
                fully_qualified_namespace=fully_qualified_namespace,
                credential=credential,
                entity_name=str(entity_name),
                queue_name=queue_name,
                topic_name=topic_name,
                subscription_name=subscription_name,
                receive_mode=receive_mode,
                max_wait_time=max_wait_time,
                auto_lock_renewer=auto_lock_renewer,
                prefetch_count=prefetch_count,
                **kwargs
            )

        self._populate_attributes(
            queue_name=queue_name,
            topic_name=topic_name,
            subscription_name=subscription_name,
            receive_mode=receive_mode,
            max_wait_time=max_wait_time,
            auto_lock_renewer=auto_lock_renewer,
            prefetch_count=prefetch_count,
            **kwargs
        )
        self._session = (
            None if self._session_id is None else ServiceBusSession(cast(str, self._session_id), self)
        )
        self._receive_context = asyncio.Event()
        self._handler: Union["pyamqp_ReceiveClientAsync", "uamqp_ReceiveClientAsync"]
        self._build_received_message = functools.partial(
            self._amqp_transport.build_received_message,
            self,
            ServiceBusReceivedMessage
        )

        self._iter_contextual_wrapper = functools.partial(
            self._amqp_transport.iter_contextual_wrapper_async, self
        )
        self._iter_next = functools.partial(
            self._amqp_transport.iter_next_async,
            self
        )

    async def __aenter__(self) -> "ServiceBusReceiver":
        if self._shutdown.is_set():
            raise ValueError(
                "The handler has already been shutdown. Please use ServiceBusClient to "
                "create a new instance."
            )
        await self._open_with_retry()
        return self

    def __aiter__(self) -> AsyncIteratorType[ServiceBusReceivedMessage]:
        return self._iter_contextual_wrapper()

    async def _inner_anext(self, wait_time: Optional[float] = None) -> ServiceBusReceivedMessage:
        # We do this weird wrapping such that an imperitive next() call, and a generator-based iter both trace sanely.
        self._check_live()
        while True:
            try:
                return await self._do_retryable_operation(self._iter_next, wait_time=wait_time)
            except StopAsyncIteration:
                self._message_iter = None
                raise

    async def __anext__(self) -> ServiceBusReceivedMessage:
        try:
            self._receive_context.set()
            message = await self._inner_anext()
            links = get_receive_links(message)
            with receive_trace_context_manager(self, links=links):
                return message
        finally:
            self._receive_context.clear()

    @classmethod
    def _from_connection_string(
        cls, conn_str: str, **kwargs: Any
    ) -> "ServiceBusReceiver":
        """Create a ServiceBusReceiver from a connection string.

        :param str conn_str: The connection string of a Service Bus.
        :keyword str queue_name: The path of specific Service Bus Queue the client connects to.
        :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription
         the client connects to.
        :keyword str subscription_name: The path of specific Service Bus Subscription under the
         specified Topic the client connects to.
        :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
         are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
         lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
         will be immediately removed from the queue, and cannot be subsequently abandoned or re-received
         if the client fails to process the message.
         The default mode is PEEK_LOCK.
        :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
        :keyword Optional[float] max_wait_time:  The timeout in seconds to wait for the first and subsequent
         messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
         until the connection is closed. The default value is None, meaning no timeout. On a sessionful
         queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
         to a session. If connection errors are occurring due to write timing out,the connection timeout
         value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
        :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
        :keyword transport_type: The type of transport protocol that will be used for communicating with
         the Service Bus service. Default is `TransportType.Amqp`.
        :paramtype transport_type: ~azure.servicebus.TransportType
        :keyword Dict http_proxy: HTTP proxy settings. This must be a dictionary with the following
         keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
         Additionally the following keys may also be present: `'username', 'password'`.
        :keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
        :keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
         This setting is only for advanced performance tuning. Increasing this value will improve message throughput
         performance but increase the chance that messages will expire while they are cached if they're not
         processed fast enough.
         The default value is 0, meaning messages will be received from the service and processed one at a time.
         In the case of prefetch_count being 0, `ServiceBusReceiver.receive_messages` would try to cache
         `max_message_count` (if provided) within its request to the service.
         WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in
         the in-memory prefetch buffer until they're received into the application. If the application ends before
         the messages are received into the application, those messages will be lost and unable to be recovered.
         Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
        :returns: The ServiceBusReceiver.
        :rtype: ~azure.servicebus.aio.ServiceBusReceiver

        :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
        :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START create_servicebus_receiver_from_conn_str_async]
                :end-before: [END create_servicebus_receiver_from_conn_str_async]
                :language: python
                :dedent: 4
                :caption: Create a new instance of the ServiceBusReceiver from connection string.

        """
        constructor_args = cls._convert_connection_string_to_kwargs(conn_str, **kwargs)
        if kwargs.get("queue_name") and kwargs.get("subscription_name"):
            raise ValueError("Queue entity does not have subscription.")

        if kwargs.get("topic_name") and not kwargs.get("subscription_name"):
            raise ValueError(
                "Subscription name is missing for the topic. Please specify subscription_name."
            )
        return cls(**constructor_args)

    def _create_handler(self, auth: Union["pyamqp_JWTTokenAuthAsync", "uamqp_JWTTokenAuthAsync"]) -> None:

        self._handler = self._amqp_transport.create_receive_client_async(
            receiver=self,
            source=self._get_source(),
            auth=auth,
            network_trace=self._config.logging_enable,
            properties=self._properties,
            retry_policy=self._error_policy,
            client_name=self._name,
            receive_mode=self._receive_mode,
            timeout=self._max_wait_time * self._amqp_transport.TIMEOUT_FACTOR
            if self._max_wait_time
            else 0,
            # set link_credit to at least 1 so that messages can be received
            link_credit=self._prefetch_count + 1,
            # If prefetch is 0, then keep_alive coroutine frequently listens on the connection for messages and
            # releases right away, since no "prefetched" messages should be in the internal buffer.
            keep_alive_interval=self._config.keep_alive
            if self._prefetch_count != 0
            else 5,
            shutdown_after_timeout=False,
            link_properties = {CONSUMER_IDENTIFIER:self._name}
        )
        # When prefetch is 0 and receive mode is PEEK_LOCK, release messages when they're received.
        # This will stop messages from expiring in the buffer and incrementing delivery count of a message.
        # If RECEIVE_AND_DELETE mode, messages are settled and removed from the Service Bus entity immediately,
        # so the regular _message_received callback should be used. This will ensure that all messages are added
        # to the internal buffer since they cannot be re-received, even if not received during an active receive call.
        if self._prefetch_count == 0 and self._receive_mode == ServiceBusReceiveMode.PEEK_LOCK:
            # pylint: disable=protected-access
            self._amqp_transport.set_handler_message_received_async(self)

    async def _open(self) -> None:
        # pylint: disable=protected-access
        if self._running:
            return
        if self._handler and not self._handler._shutdown:
            await self._handler.close_async()
        auth = None if self._connection else (await create_authentication(self))
        self._create_handler(auth)
        try:
            await self._handler.open_async(connection=self._connection)
            while not await self._handler.client_ready_async():
                await asyncio.sleep(0.05)
            self._running = True
        except:
            await self._close_handler()
            raise

        if self._auto_lock_renewer and self._session:
            self._auto_lock_renewer.register(self, self.session)

    async def _receive(
        self, max_message_count: Optional[int] = None, timeout: Optional[float] = None
    ) -> List[ServiceBusReceivedMessage]:
        # pylint: disable=protected-access
        try:
            self._receive_context.set()
            await self._open()

            amqp_receive_client = self._handler
            received_messages_queue = amqp_receive_client._received_messages
            max_message_count = max_message_count or self._prefetch_count
            timeout_seconds = (
                self._amqp_transport.TIMEOUT_FACTOR * (timeout or self._max_wait_time)
                if (timeout or self._max_wait_time)
                else 0
            )
            abs_timeout = (
                self._amqp_transport.get_current_time(amqp_receive_client) + timeout_seconds
                if timeout_seconds
                else 0
            )

            batch: Union[List["uamqp_Message"], List["pyamqp_Message"]] = []

            while not received_messages_queue.empty() and len(batch) < max_message_count:
                batch.append(received_messages_queue.get())
                received_messages_queue.task_done()
            if len(batch) >= max_message_count:
                return [self._build_received_message(message) for message in batch]

            # Dynamically issue link credit if max_message_count >= 1 when the prefetch_count is the default value 0
            if max_message_count and self._prefetch_count == 0 and max_message_count >= 1:
                link_credit_needed = max_message_count - len(batch)
                await self._amqp_transport.reset_link_credit_async(amqp_receive_client, link_credit_needed)

            first_message_received = expired = False
            receiving = True
            while receiving and not expired and len(batch) < max_message_count:
                while receiving and received_messages_queue.qsize() < max_message_count:
                    if (
                        abs_timeout
                        and self._amqp_transport.get_current_time(amqp_receive_client)
                        > abs_timeout
                    ):
                        expired = True
                        break
                    before = received_messages_queue.qsize()
                    receiving = await amqp_receive_client.do_work_async()
                    received = received_messages_queue.qsize() - before
                    if (
                        not first_message_received
                        and received_messages_queue.qsize() > 0
                        and received > 0
                    ):
                        # first message(s) received, continue receiving for some time
                        first_message_received = True
                        abs_timeout = (
                            self._amqp_transport.get_current_time(amqp_receive_client)
                            + self._further_pull_receive_timeout
                        )
                while (
                    not received_messages_queue.empty() and len(batch) < max_message_count
                ):
                    batch.append(received_messages_queue.get())
                    received_messages_queue.task_done()
            return [self._build_received_message(message) for message in batch]
        finally:
            self._receive_context.clear()

    async def _settle_message_with_retry(
        self,
        message,
        settle_operation,
        dead_letter_reason=None,
        dead_letter_error_description=None,
    ):
        # pylint: disable=protected-access
        self._check_live()
        if not isinstance(message, ServiceBusReceivedMessage):
            raise TypeError(
                "Parameter 'message' must be of type ServiceBusReceivedMessage"
            )
        self._check_message_alive(message, settle_operation)

        # The following condition check is a hot fix for settling a message received for non-session queue after
        # lock expiration.
        # pyamqp doesn't currently (and uamqp doesn't have the ability to) wait to receive disposition result returned
        # from the service after settlement, so there's no way we could tell whether a disposition succeeds or not and
        # there's no error condition info. (for uamqp, see issue: https://github.com/Azure/azure-uamqp-c/issues/274)
        if not self._session and message._lock_expired:
            raise MessageLockLostError(
                message="The lock on the message lock has expired.",
                error=message.auto_renew_error,
            )
        link = get_span_link_from_message(message)
        trace_links = [link] if link else []
        with settle_trace_context_manager(self, settle_operation, links=trace_links):
            await self._do_retryable_operation(
                self._settle_message,
                timeout=None,
                message=message,
                settle_operation=settle_operation,
                dead_letter_reason=dead_letter_reason,
                dead_letter_error_description=dead_letter_error_description,
            )
            message._settled = True

    async def _settle_message(  # type: ignore
        self,
        message: ServiceBusReceivedMessage,
        settle_operation: str,
        dead_letter_reason: Optional[str] = None,
        dead_letter_error_description: Optional[str] = None,
    ):
        # pylint: disable=protected-access
        try:
            if not message._is_deferred_message:
                try:
                    await self._amqp_transport.settle_message_via_receiver_link_async(
                        self._handler,
                        message,
                        settle_operation,
                        dead_letter_reason=dead_letter_reason,
                        dead_letter_error_description=dead_letter_error_description,
                    )
                    return
                except RuntimeError as exception:
                    _LOGGER.info(
                        "Message settling: %r has encountered an exception (%r)."
                        "Trying to settle through management link",
                        settle_operation,
                        exception,
                    )
            dead_letter_details = (
                {
                    MGMT_REQUEST_DEAD_LETTER_REASON: dead_letter_reason or "",
                    MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION: dead_letter_error_description
                    or "",
                }
                if settle_operation == MESSAGE_DEAD_LETTER
                else None
            )
            await self._settle_message_via_mgmt_link(
                MESSAGE_MGMT_SETTLEMENT_TERM_MAP[settle_operation],
                [message.lock_token],
                dead_letter_details=dead_letter_details,
            )
        except Exception as exception:
            _LOGGER.info(
                "Message settling: %r has encountered an exception (%r) through management link",
                settle_operation,
                exception,
            )
            raise

    async def _settle_message_via_mgmt_link(
        self, settlement, lock_tokens, dead_letter_details=None
    ):
        message = {
            MGMT_REQUEST_DISPOSITION_STATUS: settlement,
            MGMT_REQUEST_LOCK_TOKENS: self._amqp_transport.AMQP_ARRAY_VALUE(lock_tokens),
        }

        self._populate_message_properties(message)
        if dead_letter_details:
            message.update(dead_letter_details)

        return await self._mgmt_request_response_with_retry(
            REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION, message, mgmt_handlers.default
        )

    async def _renew_locks(self, *lock_tokens: str, timeout: Optional[float] = None) -> Any:
        message = {MGMT_REQUEST_LOCK_TOKENS: self._amqp_transport.AMQP_ARRAY_VALUE(lock_tokens)}
        return await self._mgmt_request_response_with_retry(
            REQUEST_RESPONSE_RENEWLOCK_OPERATION,
            message,
            mgmt_handlers.message_lock_renew_op,
            timeout=timeout,
        )

    async def _close_handler(self):
        self._message_iter = None
        await super(ServiceBusReceiver, self)._close_handler()

    @property
    def session(self) -> ServiceBusSession:
        """
        Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled
        entities, it would return None if called on a non-sessionful receiver.

        :rtype: ~azure.servicebus.aio.ServiceBusSession

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START get_session_async]
                :end-before: [END get_session_async]
                :language: python
                :dedent: 4
                :caption: Get session from a receiver
        """
        return self._session  # type: ignore

    async def close(self) -> None:
        await super(ServiceBusReceiver, self).close()
        self._message_iter = None

    def _get_streaming_message_iter(
        self, max_wait_time: Optional[float] = None
    ) -> AsyncIteratorType[ServiceBusReceivedMessage]:
        """Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until
        such a timeout occurs.

        :param Optional[float] max_wait_time: Maximum time to wait in seconds for the next message to arrive.
         If no messages arrive, and no timeout is specified, this call will not return
         until the connection is closed. If specified, and no messages arrive for the
         timeout period, the iterator will stop.
        :return: An async iterator of messages.
        :rtype asynciterator[~azure.servicebus.ServiceBusReceivedMessage]

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START receive_forever_async]
                :end-before: [END receive_forever_async]
                :language: python
                :dedent: 4
                :caption: Receive indefinitely from an iterator in streaming fashion.
        """
        if max_wait_time is not None and max_wait_time <= 0:
            raise ValueError("The max_wait_time must be greater than 0.")
        return self._iter_contextual_wrapper(max_wait_time)

    async def receive_messages(
        self,
        max_message_count: Optional[int] = 1,
        max_wait_time: Optional[float] = None,
    ) -> List[ServiceBusReceivedMessage]:
        """Receive a batch of messages at once.

        This approach is optimal if you wish to process multiple messages simultaneously, or
        perform an ad-hoc receive as a single call.

        Note that the number of messages retrieved in a single batch will be dependent on
        whether `prefetch_count` was set for the receiver. If `prefetch_count` is not set for the receiver,
        the receiver would try to cache max_message_count (if provided) messages within the request to the service.

        This call will prioritize returning quickly over meeting a specified batch size, and so will
        return as soon as at least one message is received and there is a gap in incoming messages regardless
        of the specified batch size.

        :param Optional[int] max_message_count: Maximum number of messages in the batch. Actual number
         returned will depend on prefetch_count size and incoming stream rate.
         Setting to None will fully depend on the prefetch config. The default value is 1.
        :param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
         If no messages arrive, and no timeout is specified, this call will not return
         until the connection is closed. If specified, and no messages arrive within the
         timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages
         when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session.
         Please use max_wait_time on the constructor to set the timeout for connecting to a session.
        :return: A list of messages received. If no messages are available, this will be an empty list.
        :rtype: list[~azure.servicebus.aio.ServiceBusReceivedMessage]

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START receive_async]
                :end-before: [END receive_async]
                :language: python
                :dedent: 4
                :caption: Receive messages from ServiceBus.

        """
        self._check_live()
        if max_wait_time is not None and max_wait_time <= 0:
            raise ValueError("The max_wait_time must be greater than 0.")
        if max_message_count is not None and max_message_count <= 0:
            raise ValueError("The max_message_count must be greater than 0")
        start_time = time.time_ns()
        messages: List[ServiceBusReceivedMessage] = await self._do_retryable_operation(
            self._receive,
            max_message_count=max_message_count,
            timeout=max_wait_time,
            operation_requires_timeout=True,
        )
        links = get_receive_links(messages)
        with receive_trace_context_manager(self, links=links, start_time=start_time):
            if (
                self._auto_lock_renewer
                and not self._session
                and self._receive_mode != ServiceBusReceiveMode.RECEIVE_AND_DELETE
            ):
                for message in messages:
                    self._auto_lock_renewer.register(self, message)
            return messages

    async def receive_deferred_messages(
        self, sequence_numbers: Union[int, List[int]], *, timeout: Optional[float] = None, **kwargs: Any
    ) -> List[ServiceBusReceivedMessage]:
        """Receive messages that have previously been deferred.

        When receiving deferred messages from a partitioned entity, all of the supplied
        sequence numbers must be messages from the same partition.

        :param Union[int, list[int]] sequence_numbers: A list of the sequence numbers of messages that have been
         deferred.
        :keyword Optional[float] timeout: The total operation timeout in seconds including all the retries.
         The value must be greater than 0 if specified. The default value is None, meaning no timeout.
        :returns: A list of the received messages.
        :rtype: list[~azure.servicebus.aio.ServiceBusReceivedMessage]

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START receive_defer_async]
                :end-before: [END receive_defer_async]
                :language: python
                :dedent: 4
                :caption: Receive deferred messages from ServiceBus.

        """
        if kwargs:
            warnings.warn(f"Unsupported keyword args: {kwargs}")
        self._check_live()
        if timeout is not None and timeout <= 0:
            raise ValueError("The timeout must be greater than 0.")
        if isinstance(sequence_numbers, int):
            sequence_numbers = [sequence_numbers]
        sequence_numbers = cast(List[int], sequence_numbers)
        if len(sequence_numbers) == 0:
            return []  # no-op on empty list.
        await self._open()
        uamqp_receive_mode = self._amqp_transport.ServiceBusToAMQPReceiveModeMap[self._receive_mode]
        try:
            receive_mode = cast(Enum, uamqp_receive_mode).value
        except AttributeError:
            receive_mode = int(uamqp_receive_mode)
        message = {
            MGMT_REQUEST_SEQUENCE_NUMBERS: self._amqp_transport.AMQP_ARRAY_VALUE(
                [self._amqp_transport.AMQP_LONG_VALUE(s) for s in sequence_numbers]
            ),
            MGMT_REQUEST_RECEIVER_SETTLE_MODE: self._amqp_transport.AMQP_UINT_VALUE(receive_mode),
        }

        self._populate_message_properties(message)

        handler = functools.partial(
            mgmt_handlers.deferred_message_op,
            receive_mode=self._receive_mode,
            receiver=self,
            amqp_transport=self._amqp_transport,
        )
        start_time = time.time_ns()
        messages = await self._mgmt_request_response_with_retry(
            REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
            message,
            handler,
            timeout=timeout,
        )
        links = get_receive_links(messages)
        with receive_trace_context_manager(
            self, span_name=SPAN_NAME_RECEIVE_DEFERRED, links=links, start_time=start_time
        ):
            if (
                self._auto_lock_renewer
                and not self._session
                and self._receive_mode != ServiceBusReceiveMode.RECEIVE_AND_DELETE
            ):
                for message in messages:
                    self._auto_lock_renewer.register(self, message)
            return messages

    async def peek_messages(
        self, max_message_count: int = 1, *, sequence_number: int = 0, timeout: Optional[float] = None, **kwargs: Any
    ) -> List[ServiceBusReceivedMessage]:
        """Browse messages currently pending in the queue.

        Peeked messages are not removed from queue, nor are they locked. They cannot be completed,
        deferred or dead-lettered.

        :param int max_message_count: The maximum number of messages to try and peek. The default
         value is 1.
        :keyword int sequence_number: A message sequence number from which to start browsing messages.
        :keyword Optional[float] timeout: The total operation timeout in seconds including all the retries.
         The value must be greater than 0 if specified. The default value is None, meaning no timeout.
        :return: A list of ~azure.servicebus.ServiceBusReceivedMessage objects.
        :rtype: list[~azure.servicebus.ServiceBusReceivedMessage]

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START peek_messages_async]
                :end-before: [END peek_messages_async]
                :language: python
                :dedent: 4
                :caption: Peek messages in the queue.
        """
        if kwargs:
            warnings.warn(f"Unsupported keyword args: {kwargs}")
        self._check_live()
        if timeout is not None and timeout <= 0:
            raise ValueError("The timeout must be greater than 0.")
        if not sequence_number:
            sequence_number = self._last_received_sequenced_number or 1
        if int(max_message_count) < 0:
            raise ValueError("max_message_count must be 1 or greater.")

        await self._open()

        message = {
            MGMT_REQUEST_FROM_SEQUENCE_NUMBER: self._amqp_transport.AMQP_LONG_VALUE(sequence_number),
            MGMT_REQUEST_MAX_MESSAGE_COUNT: max_message_count,
        }

        self._populate_message_properties(message)
        handler = functools.partial(mgmt_handlers.peek_op, receiver=self, amqp_transport=self._amqp_transport)
        start_time = time.time_ns()
        messages = await self._mgmt_request_response_with_retry(
            REQUEST_RESPONSE_PEEK_OPERATION, message, handler, timeout=timeout
        )
        links = get_receive_links(messages)
        with receive_trace_context_manager(
            self, span_name=SPAN_NAME_PEEK, links=links, start_time=start_time
        ):
            return messages

    async def complete_message(self, message: ServiceBusReceivedMessage) -> None:
        """Complete the message.

        This removes the message from the queue.

        :param message: The received message to be completed.
        :type message: ~azure.servicebus.ServiceBusReceivedMessage
        :rtype: None
        :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
        :raises: ~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
        :raises: ~azure.servicebus.exceptions.ServiceBusError when errors happen.

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START complete_message_async]
                :end-before: [END complete_message_async]
                :language: python
                :dedent: 4
                :caption: Complete a received message.

        """
        await self._settle_message_with_retry(message, MESSAGE_COMPLETE)

    async def abandon_message(self, message: ServiceBusReceivedMessage) -> None:
        """Abandon the message.

        This message will be returned to the queue and made available to be received again.

        :param message: The received message to be abandoned.
        :type message: ~azure.servicebus.ServiceBusReceivedMessage
        :rtype: None
        :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
        :raises: ~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
        :raises: ~azure.servicebus.exceptions.ServiceBusError when errors happen.

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START abandon_message_async]
                :end-before: [END abandon_message_async]
                :language: python
                :dedent: 4
                :caption: Abandon a received message.

        """
        await self._settle_message_with_retry(message, MESSAGE_ABANDON)

    async def defer_message(self, message: ServiceBusReceivedMessage) -> None:
        """Defers the message.

        This message will remain in the queue but must be requested
        specifically by its sequence number in order to be received.

        :param message: The received message to be deferred.
        :type message: ~azure.servicebus.ServiceBusReceivedMessage
        :rtype: None
        :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
        :raises: ~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
        :raises: ~azure.servicebus.exceptions.ServiceBusError when errors happen.

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START defer_message_async]
                :end-before: [END defer_message_async]
                :language: python
                :dedent: 4
                :caption: Defer a received message.

        """
        await self._settle_message_with_retry(message, MESSAGE_DEFER)

    async def dead_letter_message(
        self,
        message: ServiceBusReceivedMessage,
        reason: Optional[str] = None,
        error_description: Optional[str] = None
    ) -> None:
        """Move the message to the Dead Letter queue.

        The Dead Letter queue is a sub-queue that can be
        used to store messages that failed to process correctly, or otherwise require further inspection
        or processing. The queue can also be configured to send expired messages to the Dead Letter queue.

        :param message: The received message to be dead-lettered.
        :type message: ~azure.servicebus.ServiceBusReceivedMessage
        :param Optional[str] reason: The reason for dead-lettering the message.
        :param Optional[str] error_description: The detailed error description for dead-lettering the message.
        :rtype: None
        :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
        :raises: ~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
        :raises: ~azure.servicebus.exceptions.ServiceBusError when errors happen.

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START dead_letter_message_async]
                :end-before: [END dead_letter_message_async]
                :language: python
                :dedent: 4
                :caption: Dead letter a received message.

        """
        await self._settle_message_with_retry(
            message,
            MESSAGE_DEAD_LETTER,
            dead_letter_reason=reason,
            dead_letter_error_description=error_description,
        )

    async def renew_message_lock(
        self, message: ServiceBusReceivedMessage, *, timeout: Optional[float] = None, **kwargs: Any
    ) -> datetime.datetime:
        # pylint: disable=protected-access,no-member
        """Renew the message lock.

        This will maintain the lock on the message to ensure it is not returned to the queue
        to be reprocessed.

        In order to complete (or otherwise settle) the message, the lock must be maintained,
        and cannot already have expired; an expired lock cannot be renewed.

        Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed.
        This operation is only available for non-sessionful messages as well.

        :param message: The message to renew the lock for.
        :type message: ~azure.servicebus.ServiceBusReceivedMessage
        :keyword Optional[float] timeout: The total operation timeout in seconds including all the retries.
         The value must be greater than 0 if specified. The default value is None, meaning no timeout.
        :returns: The utc datetime the lock is set to expire at.
        :rtype: datetime.datetime
        :raises: TypeError if the message is sessionful.
        :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
        :raises: ~azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

        .. admonition:: Example:

            .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
                :start-after: [START renew_message_lock_async]
                :end-before: [END renew_message_lock_async]
                :language: python
                :dedent: 4
                :caption: Renew the lock on a received message.

        """
        if kwargs:
            warnings.warn(f"Unsupported keyword args: {kwargs}")
        try:
            if self.session:
                raise TypeError(
                    "Renewing message lock is an invalid operation when working with sessions."
                    "Please renew the session lock instead."
                )
        except AttributeError:
            pass

        self._check_live()
        self._check_message_alive(message, MESSAGE_RENEW_LOCK)
        token = message.lock_token
        if not token:
            raise ValueError("Unable to renew lock - no lock token found.")

        if timeout is not None and timeout <= 0:
            raise ValueError("The timeout must be greater than 0.")

        expiry = await self._renew_locks(token, timeout=timeout)  # type: ignore
        message._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0] / 1000.0)  # type: ignore

        return message._expiry  # type: ignore

    @property
    def client_identifier(self) -> str:
        """
        Get the ServiceBusReceiver client identifier associated with the receiver instance.

        :rtype: str
        """
        return self._name

    def __str__(self) -> str:
        return f"Receiver client id: {self.client_identifier}, entity: {self.entity_path}"
